I think the ExecPlan itself would probably need some changes.  Right
now each node has an output schema.  Most of the node implementations
depend on this in some way or another.

For example, a filter node binds the expression to the schema once at
plan construction time.  If the schema is variable what happens to the
expression "x > 3" when a batch comes by that has no "x" column?  What
happens if a batch comes by where the "x" column is a string and not a
number?  You could probably adapt some logic here but it doesn't exist
today.  The projection node will have pretty much the same dilemma.

The aggregate nodes will run into similar problems (How do you build
an effective hash map if your join column "group_id" is sometimes a
string and sometimes an integer?)

Perhaps it would be easier to think about it as having some minimal
schema with "extra" (e.g. in structured logging you always have
timestamp, level, and body, then everything else could just be extra).
Only the minimal schema can be used for the actual compute operations
while the rest is just carried along and processed by the user at the
end.

Another approach could be to adapt the scanner so that it can handle
outputting batches with different schemas.  If all you are doing is
custom iterative processing (as opposed to running traditional
relational algebra operators) the scanner should be enough.  You can
get an iterator of batches and setup your own custom processing
pipeline from that.

These are just ideas from the hip.  I think most relational algebra
systems rely on fixed schemas, and I would worry a little about the
additional complexity of trying to fight against that.

-Weston

On Mon, Nov 15, 2021 at 1:11 AM Yue Ni <niyue....@gmail.com> wrote:
>
> > This schema evolution work has not been done in the scanner yet so if
> > you are interested you might want to look at ARROW-11003[1].
>
> Thanks. I will keep an eye on it.
>
> > Or did you have a different use case for multiple schemas in mind that
> > doesn't quite fit the "promote to common schema" case?
>
> I think my use case is not exactly the same as the dataset scanner use
> case. In my case, the data set could be a set of JSON files (JSON lines
> actually, https://jsonlines.org) that are essentially structural logs from
> other applications (like this example,
> https://stackify.com/what-is-structured-logging-and-why-developers-need-it/)
> , so the schema may vary a lot from app to app. Currently we have a
> home-brewed compute engine leveraging arrow as data format, and I am
> researching if there is any chance we could leverage arrow's new C++
> compute engine, probably writing some custom execution operators to make it
> work.
>
> On Mon, Nov 15, 2021 at 4:12 PM Weston Pace <weston.p...@gmail.com> wrote:
>
> > > "The consumer of a Scan does not need to know how it is implemented,
> > > only that a uniform API is provided to obtain the next RecordBatch
> > > with a known schema.", I interpret this as `Scan` operator may
> > > produce multiple RecordBatches, and each of them should have a known
> > > schema, but next batch's schema could be different with the previous
> > > batch. Is this understanding correct?
> >
> > No.  The compute/query engine expects that all batches share the same
> > schema.
> >
> > However, it is often the case that files in a dataset have different
> > schemas.  One way to handle this is to "promote up" to the most common
> > data type for a given column.  For example, if some columns are int32
> > and some are int64 then promote all to int64.  "schema evolution" is
> > one term I've seen thrown around for this process.
> >
> > I believe the plan is to eventually have this promotion happen during
> > the scanning process.  So the scanner has to deal with fragments that
> > may have differing schemas but by the time the data reaches the query
> > engine the record batches all have the consistent common schema.
> >
> > This schema evolution work has not been done in the scanner yet so if
> > you are interested you might want to look at ARROW-11003[1].  Or did
> > you have a different use case for multiple schemas in mind that
> > doesn't quite fit the "promote to common schema" case?
> >
> > [1] https://issues.apache.org/jira/browse/ARROW-11003
> >
> >
> > On Sun, Nov 14, 2021 at 7:03 PM Yue Ni <niyue....@gmail.com> wrote:
> > >
> > > Hi there,
> > >
> > > I am evaluating Apache Arrow C++ compute engine for my project, and
> > wonder
> > > what the schema assumption is for execution operators in the compute
> > > engine.
> > >
> > > In my use case, multiple record batches for computation may have
> > different
> > > schemas. I read the Apache Arrow Query Engine for C++ design doc (
> > >
> > https://docs.google.com/document/d/10RoUZmiMQRi_J1FcPeVAUAMJ6d_ZuiEbaM2Y33sNPu4
> > ),and
> > > for the `Scan` operator, it is said "The consumer of a Scan does not need
> > > to know how it is implemented, only that a uniform API is provided to
> > > obtain the next RecordBatch with a known schema.", I interpret this as
> > > `Scan` operator may produce multiple RecordBatches, and each of them
> > should
> > > have a known schema, but next batch's schema could be different with the
> > > previous batch. Is this understanding correct?
> > >
> > > And I read arrow's source code, in `exec_plan.h`:
> > > ```
> > > class ARROW_EXPORT ExecNode {
> > > ...
> > > /// The datatypes for batches produced by this node
> > >   const std::shared_ptr<Schema>& output_schema() const { return
> > > output_schema_; }
> > > ...
> > > ```
> > > It looks like each `ExecNode` needs to provide an `output_schema`. Is it
> > > allowed to return `output_schema` that may change during ExecNode's
> > > execution? If I would like to implement an execution node that will
> > produce
> > > multiple batches that may have different schemas, is this feasible within
> > > Arrow C++ compute engine API framework? Thanks.
> > >
> > > Regards,
> > > Yue
> >

Reply via email to