[Rust][Datafusion] Dataframe state make public
Hi, Looking for comments/your view: Would it be possible to: 1. patch datafusion dataframe to make df.state public 2. patch datafusion adding method to dataframe ie: df.transform_logical_plan(mut self, new_plan) -> df where some original plan could be modified / injected with NewPlanNode (UserDefinedPlanNode). Reason: I'm working on "writer to kafka topic", on top of datafusion using ballista - to use proper distribution I need to change dataframe output to be processed/sent on each executor. To do this currently I need to have access to both dataframe and context: I need to get a state to change dataframe on-the-fly to inject it with my own UserDefinedLogicalNode. Current code works, but looks little "messy": df.write(ballista_ctx, "kafka://topic:port?brokers", Format::JSON); if I had public access to df.state that would look like: df.write_json("kafka://topic:port?brokers"); Cheers, Jaro
[RUST][Ballista] UDF/UDAF in Ballista
Hi, Quick question: is UDF/UDAF working in Ballista? I saw "TODO" in the executor part : ```rust // TODO add logic to dynamically load UDF/UDAFs libs from files scalar_functions: HashMap::new(), aggregate_functions: HashMap::new(), ``` To create an example library and add reading functionality here looks "simple enough", however I don't know how this would possibly work with client/ scheduler - resolving logical plan and then logical to physical. Not to mention how that will be passed through grpc? I feel confused. For " hack version" - should I register the udf library in all client/scheduler/executors? Will appreciate any help / pointers - learning is fun ;-) Cheers, Jaro
Re: [RUST][Datafusion][Ballista] Writer
Hi, Thanks for your response. Sorry - my bad, didn't specify it clearly. However, I will check your solution. What I'm looking for is Ballista - I need a distributed version of export/save, currently on Ballista you can only read, like S3(minio)/HDFS, but after processing I need to save the output ... put back to S3. At the moment I figure that probably easiest way will be by using object_store. >From what I see it should be done by executors not driver - that's why I start thinking about a logical plan. Best Regards, Jaro On Sat, Apr 1, 2023 at 9:23 PM Metehan Yıldırım wrote: > > Hi, > > As far as I know, exporting data from a SQL database to a CSV file or other > external file format is typically not considered part of the logical plan > for executing a SQL query. > > At present, I am developing a table sink feature in Datafusion, where I > have successfully added new APIs (insert_into and copy_to) to the > TableProvider trait. Although I have not yet submitted the PR, the new APIs > are functioning well. > > Listing table INSERT INTO support (WAITING ARROW / OBJECT STORE UPDATE) by > metesynnada · Pull Request #62 · synnada-ai/arrow-datafusion (github.com) > <https://github.com/synnada-ai/arrow-datafusion/pull/62> > > Tt should be noted that the object_store crate is primarily responsible for > the main functionality of the table sink feature. It provides ample support > for file sinking related to listing tables. If you are looking for support > beyond this, I'd like to hear the use for more help. > > Mete. > > On Sat, Apr 1, 2023 at 11:07 PM Jaroslaw Nowosad wrote: > > > Hi, > > > > Looking for advice: > > I'm looking into creating a writer part for ballista. > > There is a data source but not a sink. > > I started looking into object store -> put/put_multipart. > > But looks like simple context extension is not enough - do I need to > > extend logical/physical plan? > > > > If you have any pointers... > > > > Best Regards, > > Jaro > >
[RUST][Datafusion][Ballista] Writer
Hi, Looking for advice: I'm looking into creating a writer part for ballista. There is a data source but not a sink. I started looking into object store -> put/put_multipart. But looks like simple context extension is not enough - do I need to extend logical/physical plan? If you have any pointers... Best Regards, Jaro
Re: [RUST][Datafusion] SQL UDF in Datafusion
Thanks Jeremy! Yes, I need some time to dig in - ie: need to figure out how to divide my problem into smaller tasks. Thanks for DaskParser - this is exactly where I want to start ... probably with some simple new sql statement. I'd really appreciate any details ie: need to find out how to register functions and retrieve them later. If you have any more suggestions, thoughts - share please. Thanks, Jaro yare...@gmail.com On Thu, Jan 12, 2023 at 3:46 PM Jeremy Dyer wrote: > Hey Jaro, > > While not written in Java, nor a UDF, there are some examples in [1] > dask-sql (python based) where we do this to extend DataFusion for custom > grammars, CREATE MODEL, for example. In a nutshell you want to write some > Rust code that extends the DataFusion parser and then performs any binding > logic required when your custom UDF statement is encountered. The > processing chain is a little lengthy to follow but you can see where that > starts [2] here. The `DaskParser` maintains a member which is the > DataFusion parser itself. Happy to give more details just wanted to give > you a place to start looking. > > Thanks, > Jeremy Dyer > > [1] - https://github.com/dask-contrib/dask-sql > [2] - > > https://github.com/dask-contrib/dask-sql/blob/main/dask_planner/src/parser.rs#L385 > > On Thu, Jan 12, 2023 at 10:36 AM Jaroslaw Nowosad > wrote: > > > Hi all, > > > > I had a task to investigate how to extend Datafusion to add UDFs written > in > > plain SQL. > > Reason behind: there is quite a big bunch of SQL UDF in existing java > > (spark) solutions, however we are starting to move into the Rust > ecosystem > > and Datafussion/Arrow/Ballista looks like the proper way. > > > > Question: > > Could I get some points on how to extend DF to add "CREATE FUNCTION AAA > > (p1:int, p2: int) RETURN INT AS ' > > > I saw some rewrite propositions, extending SQL parser with a new command > or > > creating separate parser/dialect. > > > > Best Regards, > > Jaro > > >
[RUST][Datafusion] SQL UDF in Datafusion
Hi all, I had a task to investigate how to extend Datafusion to add UDFs written in plain SQL. Reason behind: there is quite a big bunch of SQL UDF in existing java (spark) solutions, however we are starting to move into the Rust ecosystem and Datafussion/Arrow/Ballista looks like the proper way. Question: Could I get some points on how to extend DF to add "CREATE FUNCTION AAA (p1:int, p2: int) RETURN INT AS '
[Datafusion] Streaming - integration with kafka - kafka_writer
Hi, I am just trying to integrate datafusion with kafka, final goal is to have end-to-end streaming. But I started from a "different side" -> step 1 is to publish output to kafka, so I copied code/ created kafka publisher: https://github.com/yarenty/arrow-datafusion/tree/master/datafusion/core/src/physical_plan/kafka Test case is here: https://github.com/yarenty/arrow-datafusion/blob/master/datafusion/core/tests/ordered_sql_to_kafka.rs All finished with something like this: ```rust #[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?; let df = ctx .sql("SELECT a, MIN(b) as bmin FROM example GROUP BY a ORDER BY a LIMIT 100") .await?; // kafka context let stream_ctx = KafkaContext::with_config( KafkaConfig::new("test_topic") .set("bootstrap.servers", "127.0.0.1:9092") .set("compression.codec", "snappy"), ); df.publish_to_kafka( stream_ctx).await?; Ok(()) } ``` Still not sure if this is the correct way to do it and if I put code in the proper places ... still: learning something new every day. Is there any other place where you can share code / check ideas? Jaro yare...@gmail.com