GitHub user robo-todd added a comment to the discussion: Join problems with custom TableProviders
Adding further information: Here are the logical plans of the two different joins. My custom TableProviders are based on the table provider example and I'm still trying to understand why the join with custom table providers is returning a large number of tiny 1 or 2 row record batches. Plan From MemTable test: (Results in 1 batch with 10's of rows depending on data). `Join(Join { left: TableScan(TableScan { table_name: Partial { schema: "public", table: "values" }, source: "...", projection: None, projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamp", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: FixedSizeList(Field { name: "item", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 64), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Partial { schema: "public", table: "values" }), Some(Partial { schema: "public", table: "values" }), Some(Partial { schema: "public", table: "values" })], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Single }] } }, filters: [], fetch: None, .. }), right: Filter(Filter { predicate: IsNotNull(Column(Column { relation: Some(Partial { schema: "public", table: "entities" }), name: "super_entity" })), input: TableScan(TableScan { table_name: Partial { schema: "public", table: "entities" }, source: "...", projection: Some([0, 2]), projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "super_entity", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Partial { schema: "public", table: "entities" }), Some(Partial { schema: "public", table: "entities" })], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }, filters: [], fetch: None, .. }) }), on: [(Column(Column { relation: Some(Partial { schema: "public", table: "values" }), name: "id" }),Column(Column { relation: Some(Partial { schema: "public", table: "entities" }), name: "id" }))], filter: None, join_type: Inner, join_constraint: On, schema: DFSchema { inner: Schema { fields: [Field { name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamp", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: FixedSizeList(Field { name: "item", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 64), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "super_entity", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Partial { schema: "public", table: "values" }), Some(Partial { schema: "public", table: "values" }), Some(Partial { schema: "public", table: "values" }), Some(Partial { schema: "public", table: "entities" }), Some(Partial { schema: "public", table: "entities" })], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Multi }, FunctionalDependence { source_indices: [3], target_indices: [3, 4], nullable: false, mode: Multi }] } }, null_equality: NullEqualsNothing }) Results in 1 batches:` Plan From Custom TableProvider Test: (Results in 18 batches of 1-2 row data) `Join(Join { left: TableScan(TableScan { table_name: Partial { schema: "testcase", table: "values" }, source: "...", projection: None, projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamp", data_type: Timestamp(Microsecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: List(Field { name: "item", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Partial { schema: "testcase", table: "values" }), Some(Partial { schema: "testcase", table: "values" }), Some(Partial { schema: "testcase", table: "values" })], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Single }] } }, filters: [], fetch: None, .. }), right: Filter(Filter { predicate: IsNotNull(Column(Column { relation: Some(Partial { schema: "testcase", table: "entities" }), name: "super_entity" })), input: TableScan(TableScan { table_name: Partial { schema: "testcase", table: "entities" }, source: "...", projection: Some([0, 2]), projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "super_entity", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Partial { schema: "testcase", table: "entities" }), Some(Partial { schema: "testcase", table: "entities" })], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }, filters: [], fetch: None, .. }) }), on: [(Column(Column { relation: Some(Partial { schema: "testcase", table: "values" }), name: "id" }), Column(Column { relation: Some(Partial { schema: "testcase", table: "entities" }), name: "id" }))], filter: None, join_type: Inner, join_constraint: On, schema: DFSchema { inner: Schema { fields: [Field { name: "id", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamp", data_type: Timestamp(Microsecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: List(Field { name: "item", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "id", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "super_entity", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Partial { schema: "testcase", table: "values" }), Some(Partial { schema: "testcase", table: "values" }), Some(Partial { schema: "testcase", table: "values" }), Some(Partial { schema: "testcase", table: "entities" }), Some(Partial { schema: "testcase", table: "entities" })], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Multi }, FunctionalDependence { source_indices: [3], target_indices: [3, 4], nullable: false, mode: Multi }] } }, null_equality: NullEqualsNothing }) Join in 18 batches:` GitHub link: https://github.com/apache/datafusion/discussions/16981#discussioncomment-13950082 ---- This is an automatically sent email for github@datafusion.apache.org. To unsubscribe, please send an email to: github-unsubscr...@datafusion.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org