[ 
https://issues.apache.org/jira/browse/ARROW-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17246101#comment-17246101
 ] 

Jorge Leitão commented on ARROW-10844:
--------------------------------------

I re-opened the issue with a complete example. There is indeed a problem of how 
we share context state between DataFrames. cc [~alamb] , as you probably get 
affected by this.

> [Rust] [DataFusion] join of two DataFrames is not possible
> ----------------------------------------------------------
>
>                 Key: ARROW-10844
>                 URL: https://issues.apache.org/jira/browse/ARROW-10844
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Rust, Rust - DataFusion
>    Affects Versions: 3.0.0
>            Reporter: Jorge Leitão
>            Priority: Blocker
>             Fix For: 3.0.0
>
>
> The complete failing test:
>  
> {code:java}
> use std::sync::Arc;
> use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch};
> use arrow::datatypes::{DataType, Field, Schema};
> use datafusion::{datasource::MemTable, prelude::JoinType};
> use datafusion::error::Result;
> use datafusion::execution::context::ExecutionContext;
> #[tokio::test]
> async fn join() -> Result<()> {
>     let schema1 = Arc::new(Schema::new(vec![
>         Field::new("a", DataType::Utf8, false),
>         Field::new("b", DataType::Int32, false),
>     ]));
>     let schema2 = Arc::new(Schema::new(vec![
>         Field::new("a", DataType::Utf8, false),
>         Field::new("c", DataType::Int32, false),
>     ]));
>     // define data.
>     let batch1 = RecordBatch::try_new(
>         schema1.clone(),
>         vec![
>             Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
>             Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
>         ],
>     )?;
>     // define data.
>     let batch2 = RecordBatch::try_new(
>         schema2.clone(),
>         vec![
>             Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
>             Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
>         ],
>     )?;
>     let mut ctx = ExecutionContext::new();
>     let table1 = MemTable::new(schema1, vec![vec![batch1]])?;
>     let table2 = MemTable::new(schema2, vec![vec![batch2]])?;
>     ctx.register_table("aa", Box::new(table1));
>     let df1 = ctx.table("aa")?;
>     ctx.register_table("aaa", Box::new(table2));
>     let df2 = ctx.table("aaa")?;
>     let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?;
>     let batches = a.collect().await?;
>     assert_eq!(batches.len(), 1);
>     Ok(())
> }
> {code}
>  
> When the create dataframes via `ctx.table`, they receive a clone of the 
> {{ExecutionContextState. }}If at a later state the context receives a new 
> table, that table will not be part of the state on the first DataFrame. On a 
> Join op, the left DataFrame's state is passed to the newly created DataFrame, 
> which is then used in collect(). Because the right side has a table not in 
> the state of the left, the execution fails.
>  
> We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common 
> mutable state across multiple DataFrames. Alternatively, not require tables 
> to be registered in the context to be used by DataFrames.
> Note that the current example in `DataFrame::join` docs works because the 
> table is registered for both DataFrames.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to