> Spark doesn't know how to create a table in external systems like
Cassandra, and that's why it's currently done inside the data source writer.

This isn't a valid argument for doing this task in the writer for v2. If we
want to fix the problems with v1, we shouldn't continue to mix write
operations with table metadata changes simply because it is more convenient
and requires less refactoring.

I'm proposing that in v2 we move creation of file system tables outside of
the writer, but still in a non-public implementation. Cassandra and other
external stores would behave as they should today and assume the table
exists, or would wait to use the v2 API until there is catalog support.

The important thing is that we don't set a standard that writers can create
tables, which is going to lead to different behavior across implementations
when we have conflicts between an existing table's config and the options
passed into the writer.

> For now, Spark just assumes data source writer takes care of it. For the
internal file format data source, I propose to pass partition/bucket
information to the writer via options, other data sources can define their
own behavior, e.g. they can also use the options, or disallow users to
write data to a non-existing table and ask users to create the table in the
external systems first.

The point is preventing data sources from defining their own behavior so we
can introduce consistent behavior across sources for v2.

rb

On Thu, Sep 28, 2017 at 8:49 PM, Wenchen Fan <cloud0...@gmail.com> wrote:

> > When this CTAS logical node is turned into a physical plan, the relation
> gets turned into a `DataSourceV2` instance and then Spark gets a writer and
> configures it with the proposed API. The main point of this is to pass the
> logical relation (with all of the user's options) through to the data
> source, not the writer. The data source creates the writer and can tell the
> writer what to do.
>
> Here is the problem: Spark doesn't know how to create a table in external
> systems like Cassandra, and that's why it's currently done inside the data
> source writer.
>
> In the future, we can add a new trait `CatalogSupport` for `DataSourceV2`,
> so that we can use your proposal and separate metadata management from data
> source writer.
>
> For now, Spark just assumes data source writer takes care of it. For the
> internal file format data source, I propose to pass partition/bucket
> information to the writer via options, other data sources can define their
> own behavior, e.g. they can also use the options, or disallow users to
> write data to a non-existing table and ask users to create the table in the
> external systems first.
>
>
>
> On Thu, Sep 28, 2017 at 5:45 AM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> On an unrelated note, is there any appetite for making the write path
>> also include an option to return elements that were not
>> able to be processed for some reason.
>>
>> Usage might be like
>>
>> saveAndIgnoreFailures() : Dataset
>>
>> So that if some records cannot be parsed by the datasource for writing,
>> or violate some contract with the datasource the records can be returned
>> for further processing or dealt with by an alternate system.
>>
>> On Wed, Sep 27, 2017 at 12:40 PM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> Comments inline. I've written up what I'm proposing with a bit more
>>> detail.
>>>
>>> On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan <cloud0...@gmail.com>
>>> wrote:
>>>
>>>> I'm trying to give a summary:
>>>>
>>>> Ideally data source API should only deal with data, not metadata. But
>>>> one key problem is, Spark still need to support data sources without
>>>> metastore, e.g. file format data sources.
>>>>
>>>> For this kind of data sources, users have to pass the metadata
>>>> information like partitioning/bucketing to every write action of a
>>>> "table"(or other identifiers like path of a file format data source), and
>>>> it's user's responsibility to make sure these metadata information are
>>>> consistent. If it's inconsistent, the behavior is undefined, different data
>>>> sources may have different behaviors.
>>>>
>>>
>>> Agreed so far. One minor point is that we currently throws an exception
>>> if you try to configure, for example, partitioning and also use
>>> `insertInto`.
>>>
>>>
>>>> If we agree on this, then data source write API should have a way to
>>>> pass these metadata information, and I think using data source options is
>>>> a good choice because it's the most implicit way and doesn't require new
>>>> APIs.
>>>>
>>>
>>> What I don't understand is why we "can't avoid this problem" unless you
>>> mean the last point, that we have to support this. I don't think that using
>>> data source options is a good choice, but maybe I don't understand the
>>> alternatives. Here's a straw-man version of what I'm proposing so you can
>>> tell me what's wrong with it or why options are a better choice.
>>>
>>> I'm assuming we start with a query like this:
>>> ```
>>> df.write.partitionBy("utc_date").bucketBy("primary_key").
>>> format("parquet").saveAsTable("s3://bucket/path/")
>>> ```
>>>
>>> That creates a logical node, `CreateTableAsSelect`, with some options.
>>> It would contain a `Relation` (or `CatalogTable` definition?) that
>>> corresponds to the user's table name and `partitionBy`, `format`, etc.
>>> calls. It should also have a write mode and the logical plan for `df`.
>>>
>>> When this CTAS logical node is turned into a physical plan, the relation
>>> gets turned into a `DataSourceV2` instance and then Spark gets a writer and
>>> configures it with the proposed API. The main point of this is to pass the
>>> logical relation (with all of the user's options) through to the data
>>> source, not the writer. The data source creates the writer and can tell the
>>> writer what to do. Another benefit of this approach is that the relation
>>> gets resolved during analysis, when it is easy to add sorts and other
>>> requirements to the logical plan.
>>>
>>> If we were to implement what I'm suggesting, then we could handle
>>> metadata conflicts outside of the `DataSourceV2Writer`, in the data source.
>>> That eliminates problems about defining behavior when there are conflicts
>>> (the next point) and prepares implementations for a catalog API that would
>>> standardize how those conflicts are handled. In the short term, this
>>> doesn't have to be in a public API yet. It can be special handling for
>>> HadoopFS relations that we can eventually use underneath a public API.
>>>
>>> Please let me know if I've misunderstood something. Now that I've
>>> written out how we could actually implement conflict handling outside of
>>> the writer, I can see that it isn't as obvious of a change as I thought.
>>> But, I think in the long term this would be a better way to go.
>>>
>>>
>>>> But then we have another problem: how to define the behavior for data
>>>> sources with metastore when the given options contain metadata information?
>>>> A typical case is `DataFrameWriter.saveAsTable`, when a user calls it with
>>>> partition columns, he doesn't know what will happen. The table may not
>>>> exist and he may create the table successfully with specified partition
>>>> columns, or the table already exist but has inconsistent partition columns
>>>> and Spark throws exception. Besides, save mode doesn't play well in this
>>>> case, as we may need different save modes for data and metadata.
>>>>
>>>> My proposal: data source API should only focus on data, but concrete
>>>> data sources can implement some dirty features via options. e.g. file
>>>> format data sources can take partitioning/bucketing from options, data
>>>> source with metastore can use a special flag in options to indicate a
>>>> create table command(without writing data).
>>>>
>>>
>>> I can see how this would make changes smaller, but I don't think it is a
>>> good thing to do. If we do this, then I think we will not really accomplish
>>> what we want to with this (a clean write API).
>>>
>>>
>>>> In other words, Spark connects users to data sources with a clean
>>>> protocol that only focus on data, but this protocol has a backdoor: the
>>>> data source options. Concrete data sources are free to define how to deal
>>>> with metadata, e.g. Cassandra data source can ask users to create table at
>>>> Cassandra side first, then write data at Spark side, or ask users to
>>>> provide more details in options and do CTAS at Spark side. These can be
>>>> done via options.
>>>>
>>>> After catalog federation, hopefully only file format data sources still
>>>> use this backdoor.
>>>>
>>>
>>> Why would file format sources use a back door after catalog federation??
>>>
>>> rb
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to