I'm trying to give a summary:

Ideally data source API should only deal with data, not metadata. But one
key problem is, Spark still need to support data sources without metastore,
e.g. file format data sources.

For this kind of data sources, users have to pass the metadata information
like partitioning/bucketing to every write action of a "table"(or other
identifiers like path of a file format data source), and it's user's
responsibility to make sure these metadata information are consistent. If
it's inconsistent, the behavior is undefined, different data sources may
have different behaviors.

If we agree on this, then data source write API should have a way to pass
these metadata information, and I think using data source options is a good
choice because it's the most implicit way and doesn't require new APIs.

But then we have another problem: how to define the behavior for data
sources with metastore when the given options contain metadata information?
A typical case is `DataFrameWriter.saveAsTable`, when a user calls it with
partition columns, he doesn't know what will happen. The table may not
exist and he may create the table successfully with specified partition
columns, or the table already exist but has inconsistent partition columns
and Spark throws exception. Besides, save mode doesn't play well in this
case, as we may need different save modes for data and metadata.

My proposal: data source API should only focus on data, but concrete data
sources can implement some dirty features via options. e.g. file format
data sources can take partitioning/bucketing from options, data source with
metastore can use a special flag in options to indicate a create table
command(without writing data).

In other words, Spark connects users to data sources with a clean protocol
that only focus on data, but this protocol has a backdoor: the data source
options. Concrete data sources are free to define how to deal with
metadata, e.g. Cassandra data source can ask users to create table at
Cassandra side first, then write data at Spark side, or ask users to
provide more details in options and do CTAS at Spark side. These can be
done via options.

After catalog federation, hopefully only file format data sources still use
this backdoor.


On Tue, Sep 26, 2017 at 8:52 AM, Wenchen Fan <cloud0...@gmail.com> wrote:

> > I think it is a bad idea to let this problem leak into the new storage
> API.
>
> Well, I think using data source options is a good compromise for this. We
> can't avoid this problem until catalog federation is done, and this may not
> happen within Spark 2.3, but we definitely need data source write API in
> Spark 2.3.
>
> > Why can't we use an in-memory catalog to store the configuration of
> HadoopFS tables?
>
> We still need to support existing Spark applications which have
> `df.write.partitionBy(...).parquet(...)`. And I think it's similar to
> `DataFrameWrier.path`, according to your theory, we should not leak `path`
> to the storage API too, but we don't have other solutions for Hadoop FS
> data sources.
>
>
> Eventually I think only Hadoop FS data sources need to take these special
> options, but for now data sources that want to support
> partitioning/bucketing need to take these special options too.
>
>
> On Tue, Sep 26, 2017 at 4:36 AM, Ryan Blue <rb...@netflix.com> wrote:
>
>> I think it is a bad idea to let this problem leak into the new storage
>> API. By not setting the expectation that metadata for a table will exist,
>> this will needlessly complicate writers just to support the existing
>> problematic design. Why can't we use an in-memory catalog to store the
>> configuration of HadoopFS tables? I see no compelling reason why this needs
>> to be passed into the V2 write API.
>>
>> If this is limited to an implementation hack for the Hadoop FS writers,
>> then I guess that's not terrible. I just don't understand why it is
>> necessary.
>>
>> On Mon, Sep 25, 2017 at 11:26 AM, Wenchen Fan <cloud0...@gmail.com>
>> wrote:
>>
>>> Catalog federation is to publish the Spark catalog API(kind of a data
>>> source API for metadata), so that Spark is able to read/write metadata from
>>> external systems. (SPARK-15777)
>>>
>>> Currently Spark can only read/write Hive metastore, which means for
>>> other systems like Cassandra, we can only implicitly create tables with
>>> data source API.
>>>
>>> Again this is not ideal but just a workaround before we finish catalog
>>> federation. That's why the save mode description mostly refer to how data
>>> will be handled instead of metadata.
>>>
>>> Because of this, I think we still need to pass metadata like
>>> partitioning/bucketing to the data source write API. And I propose to use
>>> data source options so that it's not at API level and we can easily ignore
>>> these options in the future if catalog federation is done.
>>>
>>> The same thing applies to Hadoop FS data sources, we need to pass
>>> metadata to the writer anyway.
>>>
>>>
>>>
>>> On Tue, Sep 26, 2017 at 1:08 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> However, without catalog federation, Spark doesn’t have an API to ask
>>>> an external system(like Cassandra) to create a table. Currently it’s all
>>>> done by data source write API. Data source implementations are responsible
>>>> to create or insert a table according to the save mode.
>>>>
>>>> What’s catalog federation? Is there a SPIP for it? It sounds
>>>> straight-forward based on your comments, but I’d rather make sure we’re
>>>> talking about the same thing.
>>>>
>>>> What I’m proposing doesn’t require a change to either the public API,
>>>> nor does it depend on being able to create tables. Why do writers
>>>> necessarily need to create tables? I think other components (e.g. a
>>>> federated catalog) should manage table creation outside of this
>>>> abstraction. Just because data sources currently create tables doesn’t mean
>>>> that we are tied to that implementation.
>>>>
>>>> I would also disagree that data source implementations are responsible
>>>> for creating for inserting according to save mode. The modes are “append”,
>>>> “overwrite”, “failIfExists” and “ignore”, and the descriptions indicate to
>>>> me that the mode refers to how *data* will be handled, not table
>>>> metadata. Overwrite’s docs
>>>> <https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java#L37>
>>>> state that “existing *data* is expected to be overwritten.”
>>>>
>>>> Save mode currently introduces confusion because it isn’t clear whether
>>>> the mode applies to tables or to writes. In Hive, overwrite removes
>>>> conflicting partitions, but I think the Hadoop FS relations will delete
>>>> tables. We get around this some by using external tables and preserving
>>>> data, but this is an area where we should have clear semantics for external
>>>> systems like Cassandra. I’d like to see a cleaner public API that separates
>>>> these concerns, but that’s a different discussion. For now, I don’t think
>>>> requiring that a table exists is unreasonable. If a table has no metastore
>>>> (Hadoop FS tables) then we can just pass the table metadata in when
>>>> creating the writer since there is no existence in this case.
>>>>
>>>> rb
>>>> ​
>>>>
>>>> On Sun, Sep 24, 2017 at 7:17 PM, Wenchen Fan <cloud0...@gmail.com>
>>>> wrote:
>>>>
>>>>> I agree it would be a clean approach if data source is only
>>>>> responsible to write into an already-configured table. However, without
>>>>> catalog federation, Spark doesn't have an API to ask an external
>>>>> system(like Cassandra) to create a table. Currently it's all done by data
>>>>> source write API. Data source implementations are responsible to create or
>>>>> insert a table according to the save mode.
>>>>>
>>>>> As a workaround, I think it's acceptable to pass
>>>>> partitioning/bucketing information via data source options, and data
>>>>> sources should decide to take these informations and create the table, or
>>>>> throw exception if these informations don't match the already-configured
>>>>> table.
>>>>>
>>>>>
>>>>> On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> > input data requirement
>>>>>>
>>>>>> Clustering and sorting within partitions are a good start. We can
>>>>>> always add more later when they are needed.
>>>>>>
>>>>>> The primary use case I'm thinking of for this is partitioning and
>>>>>> bucketing. If I'm implementing a partitioned table format, I need to tell
>>>>>> Spark to cluster by my partition columns. Should there also be a way to
>>>>>> pass those columns separately, since they may not be stored in the same 
>>>>>> way
>>>>>> like partitions are in the current format?
>>>>>>
>>>>>> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan <cloud0...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I want to have some discussion about Data Source V2 write path
>>>>>>> before starting a voting.
>>>>>>>
>>>>>>> The Data Source V1 write path asks implementations to write a
>>>>>>> DataFrame directly, which is painful:
>>>>>>> 1. Exposing upper-level API like DataFrame to Data Source API is not
>>>>>>> good for maintenance.
>>>>>>> 2. Data sources may need to preprocess the input data before
>>>>>>> writing, like cluster/sort the input by some columns. It's better to do 
>>>>>>> the
>>>>>>> preprocessing in Spark instead of in the data source.
>>>>>>> 3. Data sources need to take care of transaction themselves, which
>>>>>>> is hard. And different data sources may come up with a very similar
>>>>>>> approach for the transaction, which leads to many duplicated codes.
>>>>>>>
>>>>>>>
>>>>>>> To solve these pain points, I'm proposing a data source writing
>>>>>>> framework which is very similar to the reading framework, i.e.,
>>>>>>> WriteSupport -> DataSourceV2Writer -> WriteTask -> DataWriter. You can 
>>>>>>> take
>>>>>>> a look at my prototype to see what it looks like:
>>>>>>> https://github.com/apache/spark/pull/19269
>>>>>>>
>>>>>>> There are some other details need further discussion:
>>>>>>> 1. *partitioning/bucketing*
>>>>>>> Currently only the built-in file-based data sources support them,
>>>>>>> but there is nothing stopping us from exposing them to all data sources.
>>>>>>> One question is, shall we make them as mix-in interfaces for data 
>>>>>>> source v2
>>>>>>> reader/writer, or just encode them into data source options(a
>>>>>>> string-to-string map)? Ideally it's more like options, Spark just 
>>>>>>> transfers
>>>>>>> these user-given informations to data sources, and doesn't do anything 
>>>>>>> for
>>>>>>> it.
>>>>>>>
>>>>>>> 2. *input data requirement*
>>>>>>> Data sources should be able to ask Spark to preprocess the input
>>>>>>> data, and this can be a mix-in interface for DataSourceV2Writer. I 
>>>>>>> think we
>>>>>>> need to add clustering request and sorting within partitions request, 
>>>>>>> any
>>>>>>> more?
>>>>>>>
>>>>>>> 3. *transaction*
>>>>>>> I think we can just follow `FileCommitProtocol`, which is the
>>>>>>> internal framework Spark uses to guarantee transaction for built-in
>>>>>>> file-based data sources. Generally speaking, we need task level and job
>>>>>>> level commit/abort. Again you can see more details in my prototype about
>>>>>>> it: https://github.com/apache/spark/pull/19269
>>>>>>>
>>>>>>> 4. *data source table*
>>>>>>> This is the trickiest one. In Spark you can create a table which
>>>>>>> points to a data source, so you can read/write this data source easily 
>>>>>>> by
>>>>>>> referencing the table name. Ideally data source table is just a pointer
>>>>>>> which points to a data source with a list of predefined options, to save
>>>>>>> users from typing these options again and again for each query.
>>>>>>> If that's all, then everything is good, we don't need to add more
>>>>>>> interfaces to Data Source V2. However, data source tables provide 
>>>>>>> special
>>>>>>> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires 
>>>>>>> data
>>>>>>> sources to have some extra ability.
>>>>>>> Currently these special operators only work for built-in file-based
>>>>>>> data sources, and I don't think we will extend it in the near future, I
>>>>>>> propose to mark them as out of the scope.
>>>>>>>
>>>>>>>
>>>>>>> Any comments are welcome!
>>>>>>> Thanks,
>>>>>>> Wenchen
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>

Reply via email to