> The alternative path of subclassing SourceNode and having ExecNode::Init or
> ExecNode::StartProducing seems quite a bit of change (also I don't think
> SourceNode is exposed via public header). But let me know if you think I am
> missing something.

Agreed that we don't want to go this route.  David's suggestion is a
good idea.  However, this shouldn't be the responsibility of the
caller exactly.

In other words (and my lack of detailed knowledge about flight is
probably going to leak here) there should still be a factory function
(e.g. "flight_source" or something like that) and a custom options
object (FlightSourceOptions).

To start with I think a custom factory function will be sufficient
(e.g. look at MakeScanNode in scanner.cc for an example).  So the
options would somehow describe the coordinates of the flight endpoint.
The factory function would open a connection to the flight endpoint
and convert this into a record batch reader.  Then it would create one
of the node's that Yaron has contributed and return that.

However, it might be nice if "open a connection to the flight
endpoint" happened during the call to StartProducing and not during
the factory function call.  This could maybe be a follow-up task.
Perhaps source node could change so that, instead of accepting an
AsyncGenerator, it accepts an AsyncGenerator factory function.  Then
it could execute that function during the call to StartProducing.

On Tue, Sep 13, 2022 at 4:05 PM Li Jin <ice.xell...@gmail.com> wrote:
>
> Thanks Yaron for the pointer to that PR.
>
> On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili <rt...@hotmail.com> wrote:
>
> > If you can wrap the flight reader as a RecordBatchReader, then another
> > possibility is using an upcoming PR (
> > https://github.com/apache/arrow/pull/14041) that enables SourceNode to
> > accept it. You would need to know the schema when configuring the
> > SourceNode, but you won't need to derived from SourceNode.
> >
> >
> > Yaron.
> > ________________________________
> > From: Li Jin <ice.xell...@gmail.com>
> > Sent: Tuesday, September 13, 2022 3:58 PM
> > To: dev@arrow.apache.org <dev@arrow.apache.org>
> > Subject: Re: Integration between Flight and Acero
> >
> > Update:
> >
> > I am going to try what David Li suggested here:
> > https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
> >
> > This seems to be the least amount of code. This does require calling
> > "DoGet" at Acero plan/node creation time rather than execution time but I
> > don't think it's a big deal for now.
> >
> > The alternative path of subclassing SourceNode and having ExecNode::Init or
> > ExecNode::StartProducing seems quite a bit of change (also I don't think
> > SourceNode is exposed via public header). But let me know if you think I am
> > missing something.
> >
> > Li
> >
> > On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > > Hi Li,
> > >
> > > Here's my 2 cents about the Ibis/Substrait part of this.
> > >
> > > An Ibis expression carries a schema. If you're planning to create an
> > > integrated Ibis/Substrait/Arrow solution, then you'll need the schema to
> > be
> > > available to Ibis in Python. So, you'll need a Python wrapper for the C++
> > > implementation you have in mind for the GetSchema method. I think you
> > > should pass the schema obtained by (the wrapped) GetSchema to an Ibis
> > node,
> > > rather than defining a new Ibis node that would have to access the
> > network
> > > to get the schema on its own.
> > >
> > > Given the above, I agree with you that when the Acero node is created its
> > > schema would already be known.
> > >
> > >
> > > Yaron.
> > > ________________________________
> > > From: Li Jin <ice.xell...@gmail.com>
> > > Sent: Thursday, September 1, 2022 2:49 PM
> > > To: dev@arrow.apache.org <dev@arrow.apache.org>
> > > Subject: Re: Integration between Flight and Acero
> > >
> > > Thanks David. I think my original question might not have been accurate
> > so
> > > I will try to rephrase my question:
> > >
> > > My ultimate goal is to add an ibis source node:
> > >
> > > class MyStorageTable(ibis.TableNode, sch.HasSchema):
> > >     url = ... # e.g. "my_storage://my_path"
> > >     begin = ... # e.g. "20220101"
> > >     end = ... # e.g. "20220201"
> > >
> > > and pass it to Acero and have Acero create a source node that knows how
> > to
> > > read from my_storage. Currently, I have a C++ class that looks like this
> > > that knows how to read/write data:
> > >
> > > class MyStorageClient {
> > >
> > >     public:
> > >
> > >         /// \brief Construct a client
> > >
> > >         MyStorageClient(const std::string& service_location);
> > >
> > >
> > >
> > >         /// \brief Read data from a table streamingly
> > >
> > >         /// \param[in] table_uri
> > >
> > >         /// \param[in] start_time The start time (inclusive), e.g.,
> > > '20100101'
> > >
> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> > '20100110'
> > >
> > >         arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
> > > ReadStream(const std::string& table_uri, const std::string& start_time,
> > > const std::string& end_time);
> > >
> > >
> > >
> > >         /// \brief Write data to a table streamingly
> > >
> > >         /// This method will return a FlightStreamWriter that can be used
> > > for streaming data into
> > >
> > >         /// \param[in] table_uri
> > >
> > >         /// \param[in] start_time The start time (inclusive), e.g.,
> > > '20100101'
> > >
> > >         /// \param[in] end_time The end time (exclusive), e.g.,
> > '20100110'
> > >
> > >         arrow::Result<DoPutResult> WriteStream(const std::string&
> > > table_uri, const std::shared_ptr<arrow::Schema> &schema, const
> > std::string
> > > &start_time, const std::string &end_time);
> > >
> > >
> > >
> > >         /// \brief Get schema of a table.
> > >
> > >         /// \param[in] table The Smooth table name, e.g.,
> > > smooth:/research/user/ljin/test
> > >
> > >         arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
> > > std::string& table_uri);
> > >     };
> > >
> > > I think Acero node's schema must be known when the node is created, I'd
> > > imagine I would implement MyStorageExecNode that gets created by
> > > SubstraitConsumer (via some registration mechanism in SubstraitConsumer):
> > >
> > > (1) GetSchema is called in SubstraitConsumer when creating the node
> > > (network call to the storage backend to get schema)
> > > (2) ReadStream is called in either ExecNode::Init or
> > > ExecNode::StartProducing
> > > to create the FlightStreamReader (3) Some thread (either the Plan's
> > > execution thread or the thread owned by MyStorageExecNode) will read from
> > > FlightStreamReader and send data downstream.
> > >
> > > Does that sound like the right approach or is there some other way I
> > should
> > > do this?
> > >
> > > On Wed, Aug 31, 2022 at 6:16 PM David Li <lidav...@apache.org> wrote:
> > >
> > > > Hi Li,
> > > >
> > > > It'd depend on how exactly you expect everything to fit together, and I
> > > > think the way you'd go about it would depend on what exactly the
> > > > application is. For instance, you could have the application code do
> > > > everything up through DoGet and get a reader, then create a SourceNode
> > > from
> > > > the reader and continue from there.
> > > >
> > > > Otherwise, I would think the way to go would be to be able to create a
> > > > node from a FlightDescriptor (which would contain the URL/parameters in
> > > > your example). In that case, I think it'd fit into Arrow Dataset, under
> > > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> > > > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As
> > a
> > > > bonus, there's already good integration between Dataset and Acero and
> > > this
> > > > should naturally do things like read the FlightEndpoints in parallel
> > with
> > > > readahead and so on.
> > > >
> > > > That means: you'd start with the FlightDescriptor, and create a Dataset
> > > > from it. This will call GetFlightInfo under the hood. (There's a minor
> > > > catch here: this assumes the service that returns the FlightInfo can
> > > embed
> > > > an accurate schema into it. If that's not true, there'll have to be
> > some
> > > > finagling with various ways of getting the actual schema, depending on
> > > what
> > > > exactly your service supports.) Once you have a Dataset, you can create
> > > an
> > > > ExecPlan and proceed like normal.
> > > >
> > > > Of course, if you then want to get things into Python, R, Substrait,
> > > > etc... that requires some more work - especially for Substrait where
> > I'm
> > > > not sure how best to encode a custom source like that.
> > > >
> > > > [1]: https://issues.apache.org/jira/browse/ARROW-10524
> > > >
> > > > -David
> > > >
> > > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > > > > Hello!
> > > > >
> > > > > I have recently started to look into integrating Flight RPC with
> > Acero
> > > > > source/sink node.
> > > > >
> > > > > In Flight, the life cycle of a "read" request looks sth like:
> > > > >
> > > > >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> > > > (e.g.,
> > > > >    begin = "20220101", end = "20220201")
> > > > >    - Client issue GetFlightInfo and get FlightInfo from server
> > > > >    - Client issue DoGet with the FlightInfo and get a stream reader
> > > > >    - Client calls Nextuntil stream is exhausted
> > > > >
> > > > > My question is, how does the above life cycle fit in an Acero node?
> > In
> > > > > other words, what are the proper places in Acero node lifecycle to
> > > issue
> > > > > the corresponding flight RPC?
> > > > >
> > > > > Appreciate any thoughts,
> > > > > Li
> > > >
> > >
> >

Reply via email to