GitHub user robo-todd added a comment to the discussion: Join problems with 
custom TableProviders

Adding further information:
Here are the logical plans of the two different joins. My custom TableProviders 
are based on the table provider example and I'm still trying to understand why 
the join with custom table providers is returning a large number of tiny 1 or 2 
row record batches.


Plan From MemTable test:  (Results in 1 batch with 10's of rows depending on 
data).
`Join(Join {
left: TableScan(TableScan { table_name: Partial { schema: "public", table: 
"values" }, source: "...", projection: None,
projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} },
                                                      Field { name: 
"timestamp", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} },
                                                      Field { name: "value", 
data_type: FixedSizeList(Field { name: "item", data_type: Float32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 64), nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "public", table: "values" }), 
Some(Partial { schema: "public", table: "values" }), Some(Partial { schema: 
"public", table: "values" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { 
source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Single 
}] } }, filters: [], fetch: None, .. }),
right: Filter(Filter { predicate: IsNotNull(Column(Column { relation: 
Some(Partial { schema: "public", table: "entities" }), name: "super_entity" })),
input: TableScan(TableScan { table_name: Partial { schema: "public", table: 
"entities" }, source: "...", projection: Some([0, 2]),
projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} },
                                                      Field { name: 
"super_entity", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "public", table: "entities" }), 
Some(Partial { schema: "public", table: "entities" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { 
source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } 
}, filters: [],
fetch: None, .. }) }), on: [(Column(Column { relation: Some(Partial { schema: 
"public", table: "values" }), name: "id" }),Column(Column { relation: 
Some(Partial { schema: "public", table: "entities" }), name: "id" }))], filter: 
None,
join_type: Inner, join_constraint: On, schema: DFSchema { inner: Schema { 
fields: [Field { name: "id", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} },
                                                                                
   Field { name: "timestamp", data_type: Timestamp(Nanosecond, None), nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} },
                                                                                
   Field { name: "value", data_type: FixedSizeList(Field { name: "item", 
data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }, 64), nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} },
                                                                                
   Field { name: "id", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} },
                                                                                
   Field { name: "super_entity", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "public", table: "values" }), 
Some(Partial { schema: "public", table: "values" }), Some(Partial { schema: 
"public", table: "values" }), Some(Partial { schema: "public", table: 
"entities" }), Some(Partial { schema: "public", table: "entities" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { 
source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Multi }, 
FunctionalDependence { source_indices: [3], target_indices: [3, 4], nullable: 
false, mode: Multi }] } }, null_equality: NullEqualsNothing })
Results in 1 batches:`

Plan From Custom TableProvider Test: (Results in 18 batches of 1-2 row data)


`Join(Join {
left: TableScan(TableScan { table_name: Partial { schema: "testcase", table: 
"values" }, source: "...", projection: None,
projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} },
                                                     Field { name: "timestamp", 
data_type: Timestamp(Microsecond, None), nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} },
                                                     Field { name: "value", 
data_type: List(Field { name: "item", data_type: Float32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "testcase", table: "values" }), 
Some(Partial { schema: "testcase", table: "values" }), Some(Partial { schema: 
"testcase", table: "values" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { 
source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Single 
}] } }, filters: [], fetch: None, .. }),
right: Filter(Filter { predicate: IsNotNull(Column(Column { relation: 
Some(Partial { schema: "testcase", table: "entities" }), name: "super_entity" 
})),
input: TableScan(TableScan { table_name: Partial { schema: "testcase", table: 
"entities" }, source: "...", projection: Some([0, 2]),
projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} },
                                                      Field { name: 
"super_entity", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "testcase", table: "entities" }), 
Some(Partial { schema: "testcase", table: "entities" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { 
source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } 
}, filters: [],
fetch: None, .. }) }),
on: [(Column(Column { relation: Some(Partial { schema: "testcase", table: 
"values" }), name: "id" }), Column(Column { relation: Some(Partial { schema: 
"testcase", table: "entities" }), name: "id" }))], filter: None,
join_type: Inner, join_constraint: On, schema: DFSchema { inner: Schema { 
fields: [Field { name: "id", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} },
                                                                                
   Field { name: "timestamp", data_type: Timestamp(Microsecond, None), 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
                                                                                
   Field { name: "value", data_type: List(Field { name: "item", data_type: 
Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
                                                                                
   Field { name: "id", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} },
                                                                                
   Field { name: "super_entity", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "testcase", table: "values" }), 
Some(Partial { schema: "testcase", table: "values" }), Some(Partial { schema: 
"testcase", table: "values" }), Some(Partial { schema: "testcase", table: 
"entities" }), Some(Partial { schema: "testcase", table: "entities" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { 
source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Multi }, 
FunctionalDependence { source_indices: [3], target_indices: [3, 4], nullable: 
false, mode: Multi }] } }, null_equality: NullEqualsNothing })
Join in 18 batches:`

GitHub link: 
https://github.com/apache/datafusion/discussions/16981#discussioncomment-13950082

----
This is an automatically sent email for github@datafusion.apache.org.
To unsubscribe, please send an email to: 
github-unsubscr...@datafusion.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to