[ https://issues.apache.org/jira/browse/ARROW-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17246101#comment-17246101 ]
Jorge Leitão commented on ARROW-10844: -------------------------------------- I re-opened the issue with a complete example. There is indeed a problem of how we share context state between DataFrames. cc [~alamb] , as you probably get affected by this. > [Rust] [DataFusion] join of two DataFrames is not possible > ---------------------------------------------------------- > > Key: ARROW-10844 > URL: https://issues.apache.org/jira/browse/ARROW-10844 > Project: Apache Arrow > Issue Type: Bug > Components: Rust, Rust - DataFusion > Affects Versions: 3.0.0 > Reporter: Jorge Leitão > Priority: Blocker > Fix For: 3.0.0 > > > The complete failing test: > > {code:java} > use std::sync::Arc; > use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch}; > use arrow::datatypes::{DataType, Field, Schema}; > use datafusion::{datasource::MemTable, prelude::JoinType}; > use datafusion::error::Result; > use datafusion::execution::context::ExecutionContext; > #[tokio::test] > async fn join() -> Result<()> { > let schema1 = Arc::new(Schema::new(vec![ > Field::new("a", DataType::Utf8, false), > Field::new("b", DataType::Int32, false), > ])); > let schema2 = Arc::new(Schema::new(vec![ > Field::new("a", DataType::Utf8, false), > Field::new("c", DataType::Int32, false), > ])); > // define data. > let batch1 = RecordBatch::try_new( > schema1.clone(), > vec![ > Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), > Arc::new(Int32Array::from(vec![1, 10, 10, 100])), > ], > )?; > // define data. > let batch2 = RecordBatch::try_new( > schema2.clone(), > vec![ > Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), > Arc::new(Int32Array::from(vec![1, 10, 10, 100])), > ], > )?; > let mut ctx = ExecutionContext::new(); > let table1 = MemTable::new(schema1, vec![vec![batch1]])?; > let table2 = MemTable::new(schema2, vec![vec![batch2]])?; > ctx.register_table("aa", Box::new(table1)); > let df1 = ctx.table("aa")?; > ctx.register_table("aaa", Box::new(table2)); > let df2 = ctx.table("aaa")?; > let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?; > let batches = a.collect().await?; > assert_eq!(batches.len(), 1); > Ok(()) > } > {code} > > When the create dataframes via `ctx.table`, they receive a clone of the > {{ExecutionContextState. }}If at a later state the context receives a new > table, that table will not be part of the state on the first DataFrame. On a > Join op, the left DataFrame's state is passed to the newly created DataFrame, > which is then used in collect(). Because the right side has a table not in > the state of the left, the execution fails. > > We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common > mutable state across multiple DataFrames. Alternatively, not require tables > to be registered in the context to be used by DataFrames. > Note that the current example in `DataFrame::join` docs works because the > table is registered for both DataFrames. -- This message was sent by Atlassian Jira (v8.3.4#803005)