init-js opened a new issue, #15837:
URL: https://github.com/apache/datafusion/issues/15837
### Describe the bug
Our goal is to take an existing `DataFrame` and change the parquet field ids
(after the fact) of its schema. The function `Projection::new_from_schema`
looks promising, in that it seems to be in a position to update the output
schema.
However, after executing the plan, and collecting the record batches, we
notice that the field metadata on our output schema mathches the original field
ids, rather than the field metadata we assigned in the outer Projection.
Should this be this a supported operation?
We also considered writing the dataframe to another table (with a different
schema) as an alternative, but we would like to avoid additional copies. All we
want to change are the field ids.
Some code snippet to illustrate how we wrap the DataFrame in another
LogicalPlan:
```
fn with_field_ids(original_df: DataFrame, mapping: HashMap<String, u64>) ->
Result<DataFrame> {
let (session_state, input_plan) = original_df.into_parts();
// this transformation is elided, but all we do is copy all the fields
over to a new schema,
// and we assign new values for the field metadata key
`PARQUET_FIELD_ID_META_KEY`.
let remapped_schema: Schema =
modify_field_ids(input_plan.schema().as_arrow().clone(), ...)?;
// the rest below is "wrapping" the original dataframe into a new
projection.
let remapped_df_schema: DFSchema = remapped_schema.to_dfschema()?
.with_functional_dependencies(input_plan.schema().functional_dependencies().clone())?;
// wrap the plan into another logical plan that applies our modified
schema.
let output_plan = LogicalPlan::Projection(Projection::new_from_schema(
Arc::new(input_plan),
Arc::new(remapped_df_schema),
));
Ok(DataFrame::new(session_state, output_plan))
}
```
### To Reproduce
I'm outputting a debug print of the resulting plan.
We apply our transformation, execute the plan (with collect()) and then
compare the
metadata in the output schema with what we expect. What we find instead is
the
original schema.
```
... // elided. We register a table from a parquet file with a column
"bucket", with field id 100
let df = ctx.sql("SELECT bucket FROM mytable").await.unwrap();
let mut remap: HashMap<String, u64> = HashMap::new();
remap.insert(BUCKET.to_string(), 111);
// this consumes the original dataframe and wraps it in a new schema
projection
// where the "bucket" field id has been set to 111.
let df_remapped = with_field_ids(df, &remap).expect("nothing to remap");
// shown below in ticket
dbg!("OUTPUT PLAN", &df_remapped.logical_plan());
let x = df_remapped.collect().await.expect("should not fail");
let batch = x.get(0).unwrap();
let output_schema = batch.schema();
// shown below in ticket
dbg!("OUTPUT_SCHEMA", &output_schema);
// extract the id from the parquet metadata.
let parq_schema_descriptor =
arrow_to_parquet_schema(&output_schema).expect("should be able
to get descriptor");
let parq_fields = parq_schema_descriptor.root_schema().get_fields();
assert_eq!(parq_fields.len(), 1);
// this fails. the original id comes to the surface. our field id from the
outermost projection is ignored.
assert_eq!(parq_fields[0].get_basic_info().id(), 111);
```
### Expected behavior
We would expect the output schema when we execute the dataframe's plan to
reflect the "outermost" projection's schema and have "PARQUET:field_id": "111".
But the inner plan's schema is used instead.
### Additional context
Note that the outer logical plan's schema mapping has a field id of 111:
`"PARQUET:field_id": "111",` which is the desired output we want.
And the inner logical plan's schema has the original field id:
`"PARQUET:field_id": "100",`.
```
[src/schema/mod.rs:599:9] &df_remapped.logical_plan() = Projection(
Projection {
expr: [
Column(
Column {
relation: None,
name: "bucket",
},
),
],
input: Projection(
Projection {
expr: [
Column(
Column {
relation: Some(
Bare {
table: "mytable",
},
),
name: "bucket",
},
),
],
input: TableScan(
TableScan {
table_name: Bare {
table: "mytable",
},
source: "...",
projection: None,
projected_schema: DFSchema {
inner: Schema {
fields: [
Field {
name: "bucket",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {
"PARQUET:field_id": "100",
},
},
Field {
name: "key",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {
"PARQUET:field_id": "200",
},
},
],
metadata: {},
},
field_qualifiers: [
Some(
Bare {
table: "mytable",
},
),
Some(
Bare {
table: "mytable",
},
),
Some(
Bare {
table: "mytable",
},
),
Some(
Bare {
table: "mytable",
},
),
Some(
Bare {
table: "mytable",
},
),
Some(
Bare {
table: "mytable",
},
),
],
functional_dependencies: FunctionalDependencies {
deps: [],
},
},
filters: [],
fetch: None,
..
},
),
schema: DFSchema {
inner: Schema {
fields: [
Field {
name: "bucket",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {
"PARQUET:field_id": "100",
},
},
],
metadata: {},
},
field_qualifiers: [
Some(
Bare {
table: "mytable",
},
),
],
functional_dependencies: FunctionalDependencies {
deps: [],
},
},
},
),
schema: DFSchema {
inner: Schema {
fields: [
Field {
name: "bucket",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {
"PARQUET:field_id": "111",
},
},
],
metadata: {},
},
field_qualifiers: [
None,
],
functional_dependencies: FunctionalDependencies {
deps: [],
},
},
},
)
```
And the output schema of the record batches, after we collect() the
DataFrame, shows the innermost metadata, not the outer projection:
```
[src/schema/mod.rs:604:9] &output_schema = Schema {
fields: [
Field {
name: "bucket",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {
"PARQUET:field_id": "100",
},
},
],
metadata: {},
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]