[Rust][Datafusion] Dataframe state make public

2023-09-21 Thread Jaroslaw Nowosad
Hi,

Looking for comments/your view:

Would it be possible to:
1. patch datafusion dataframe to make df.state public
2. patch datafusion adding method to  dataframe ie:
df.transform_logical_plan(mut self, new_plan) -> df where some
original plan could be modified / injected with NewPlanNode
(UserDefinedPlanNode).

Reason:
I'm working on "writer to kafka topic", on top of datafusion using
ballista - to use proper distribution I need to change dataframe
output to be processed/sent on each executor.
To do this currently I need to have access to both dataframe and
context: I need to get a state to change dataframe on-the-fly to
inject it with my own UserDefinedLogicalNode.

Current code works, but looks little "messy":
df.write(ballista_ctx, "kafka://topic:port?brokers", Format::JSON);

if I had public access to df.state that would look like:
df.write_json("kafka://topic:port?brokers");


Cheers,
Jaro


[RUST][Ballista] UDF/UDAF in Ballista

2023-06-27 Thread Jaroslaw Nowosad
Hi,
Quick question: is UDF/UDAF working in Ballista?
I saw "TODO" in the executor part :

```rust
// TODO add logic to dynamically load UDF/UDAFs libs from files
scalar_functions: HashMap::new(),
aggregate_functions: HashMap::new(),
```

To create an example library and add reading functionality here looks
"simple enough", however I don't know how this would possibly work
with client/ scheduler - resolving logical plan and then logical to
physical. Not to mention how that will be passed through grpc?
I feel confused.

For " hack version" - should I register the udf library in all
client/scheduler/executors?

Will appreciate any help / pointers - learning is fun ;-)


Cheers,
Jaro


Re: [RUST][Datafusion][Ballista] Writer

2023-04-02 Thread Jaroslaw Nowosad
Hi,

Thanks for your response.

Sorry - my bad, didn't specify it clearly. However, I will check your solution.
What I'm looking for is Ballista - I need a distributed version of
export/save, currently on Ballista you can only read, like
S3(minio)/HDFS, but after processing I need to save the output ... put
back to S3.
At the moment I figure that probably easiest way will be by using object_store.
>From what I see it should be done by executors not driver - that's why
I start thinking about a logical plan.

Best Regards,
Jaro


On Sat, Apr 1, 2023 at 9:23 PM Metehan Yıldırım
 wrote:
>
> Hi,
>
> As far as I know, exporting data from a SQL database to a CSV file or other
> external file format is typically not considered part of the logical plan
> for executing a SQL query.
>
> At present, I am developing a table sink feature in Datafusion, where I
> have successfully added new APIs (insert_into and copy_to) to the
> TableProvider trait. Although I have not yet submitted the PR, the new APIs
> are functioning well.
>
> Listing table INSERT INTO support (WAITING ARROW / OBJECT STORE UPDATE) by
> metesynnada · Pull Request #62 · synnada-ai/arrow-datafusion (github.com)
> <https://github.com/synnada-ai/arrow-datafusion/pull/62>
>
> Tt should be noted that the object_store crate is primarily responsible for
> the main functionality of the table sink feature. It provides ample support
> for file sinking related to listing tables. If you are looking for support
> beyond this, I'd like to hear the use for more help.
>
> Mete.
>
> On Sat, Apr 1, 2023 at 11:07 PM Jaroslaw Nowosad  wrote:
>
> > Hi,
> >
> > Looking for advice:
> > I'm looking into creating a writer part for ballista.
> > There is a data source but not a sink.
> > I started looking into object store -> put/put_multipart.
> > But looks like simple context extension is not enough - do I need to
> > extend logical/physical plan?
> >
> > If you have any pointers...
> >
> > Best Regards,
> > Jaro
> >


[RUST][Datafusion][Ballista] Writer

2023-04-01 Thread Jaroslaw Nowosad
Hi,

Looking for advice:
I'm looking into creating a writer part for ballista.
There is a data source but not a sink.
I started looking into object store -> put/put_multipart.
But looks like simple context extension is not enough - do I need to
extend logical/physical plan?

If you have any pointers...

Best Regards,
Jaro


Re: [RUST][Datafusion] SQL UDF in Datafusion

2023-01-13 Thread Jaroslaw Nowosad
Thanks Jeremy!

Yes, I need some time to dig in - ie: need to figure out how to divide my
problem into smaller tasks.
Thanks for DaskParser - this is exactly where I want to start ... probably
with some simple new sql statement.
I'd really appreciate any details ie: need to find out how to register
functions and retrieve them later.
If you have any more suggestions, thoughts - share please.

Thanks,
Jaro
yare...@gmail.com



On Thu, Jan 12, 2023 at 3:46 PM Jeremy Dyer  wrote:

> Hey Jaro,
>
> While not written in Java, nor a UDF, there are some examples in [1]
> dask-sql (python based) where we do this to extend DataFusion for custom
> grammars, CREATE MODEL, for example. In a nutshell you want to write some
> Rust code that extends the DataFusion parser and then performs any binding
> logic required when your custom UDF statement is encountered. The
> processing chain is a little lengthy to follow but you can see where that
> starts [2] here. The `DaskParser` maintains a member which is the
> DataFusion parser itself. Happy to give more details just wanted to give
> you a place to start looking.
>
> Thanks,
> Jeremy Dyer
>
> [1] - https://github.com/dask-contrib/dask-sql
> [2] -
>
> https://github.com/dask-contrib/dask-sql/blob/main/dask_planner/src/parser.rs#L385
>
> On Thu, Jan 12, 2023 at 10:36 AM Jaroslaw Nowosad 
> wrote:
>
> > Hi all,
> >
> > I had a task to investigate how to extend Datafusion to add UDFs written
> in
> > plain SQL.
> > Reason behind:  there is quite a big bunch of SQL UDF in existing java
> > (spark) solutions, however we are starting to move into the Rust
> ecosystem
> > and Datafussion/Arrow/Ballista looks like the proper way.
> >
> > Question:
> > Could I get some points on how to extend DF to add "CREATE FUNCTION AAA
> > (p1:int, p2: int) RETURN INT AS ' >
> > I saw some rewrite propositions, extending SQL parser with a new command
> or
> > creating separate parser/dialect.
> >
> > Best Regards,
> > Jaro
> >
>


[RUST][Datafusion] SQL UDF in Datafusion

2023-01-12 Thread Jaroslaw Nowosad
Hi all,

I had a task to investigate how to extend Datafusion to add UDFs written in
plain SQL.
Reason behind:  there is quite a big bunch of SQL UDF in existing java
(spark) solutions, however we are starting to move into the Rust ecosystem
and Datafussion/Arrow/Ballista looks like the proper way.

Question:
Could I get some points on how to extend DF to add "CREATE FUNCTION AAA
(p1:int, p2: int) RETURN INT AS '

[Datafusion] Streaming - integration with kafka - kafka_writer

2022-06-24 Thread Jaroslaw Nowosad
Hi,
I am just trying to integrate datafusion with kafka,  final goal is to have
end-to-end streaming. But I started from a "different side" -> step 1 is to
publish output to kafka, so I copied code/ created kafka publisher:
https://github.com/yarenty/arrow-datafusion/tree/master/datafusion/core/src/physical_plan/kafka

Test case is here:
https://github.com/yarenty/arrow-datafusion/blob/master/datafusion/core/tests/ordered_sql_to_kafka.rs


All finished with something like this:
```rust

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/example.csv",
CsvReadOptions::new()).await?;

let df = ctx
.sql("SELECT a, MIN(b) as bmin FROM example GROUP BY a ORDER BY a
LIMIT 100")
.await?;

// kafka context
let stream_ctx = KafkaContext::with_config(
KafkaConfig::new("test_topic")
.set("bootstrap.servers", "127.0.0.1:9092")
.set("compression.codec", "snappy"),
);

df.publish_to_kafka( stream_ctx).await?;

Ok(())
}
```

Still not sure if this is the correct way to do it and if I put code in the
proper places ... still: learning something new every day.

Is there any other place where you can share code / check ideas?

Jaro
yare...@gmail.com